﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using OF.DistributeService.Core.Common;
using OF.Notify.DataHost.Cluster;
using OF.Notify.Entity;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ZooKeeperNet;

namespace OF.Notify.Master.Zookeeper
{
    public class TopicClusterIdProvider
    {
        internal ZooKeeperProxy zk;
        internal byte topicEnum;

        internal string clusterIdPath;
        public string dataCollectionIdPath;
        internal TopicClusterMaster topicClusterMaster;

        public TopicClusterIdProvider(TopicClusterMaster topicClusterMaster, ZooKeeperProxy zk, byte topicEnum)
        {
            this.topicClusterMaster = topicClusterMaster;
            this.zk = zk;
            this.topicEnum = (byte)topicEnum;
            
            clusterIdPath = NotifyConfig.ClusterIdPath + "/" + this.topicEnum;
            dataCollectionIdPath = NotifyConfig.DataCollectionIdPath + "/" + this.topicEnum;
        }

        internal string GetParentPath(string zkPath)
        {
            return zkPath.Substring(0, zkPath.LastIndexOf("/"));
        }

        public int GetNextClusterId()
        {
            var context = ClusterContext.Get();
            int result = ClusterContext.InvalidateId;
            Util.SafeLoopUtilTrue((i1) => {
                if (topicClusterMaster.clusterMaster.isDisposed)
                {
                    return true;
                }
                result = GetNextIdFromZK(clusterIdPath);
                return true;
            }, context.GetMaxRetryZookeeperCount(), context.GetRetryFailSleepMS());
            return result;
        }

        public int GetNextDataCollectionId()
        {
            var context = ClusterContext.Get();
            int result = ClusterContext.InvalidateId;
            Util.SafeLoopUtilTrue((i1) =>
            {
                if (topicClusterMaster.clusterMaster.isDisposed)
                {
                    return true;
                }
                result = GetNextIdFromZK(dataCollectionIdPath);
                return true;
            }, context.GetMaxRetryZookeeperCount(), context.GetRetryFailSleepMS());
            return result;
        }

        public int GetMaxDataCollectionId()
        {
            return GetMaxIdFromZK(dataCollectionIdPath);
        }

        internal int GetMaxIdFromZK(string zkPath)
        {
            try
            {
                lock (this)
                {
                    zk.SafeCreatePath(GetParentPath(zkPath), ZooKeeperProxy.EmptyBuffer);
                    var nodeVersion = zk.Exists(zkPath, false);
                    if (nodeVersion == null)
                    {
                        int defaultValue = 0;
                        return defaultValue;
                    }
                    else
                    {
                        byte[] btArray = zk.GetData(zkPath, false, nodeVersion);
                        int lastValue = BitConverter.ToInt32(btArray, 0);
                        return lastValue;
                    }
                }
            }
            catch
            {
                if (zk.IsDisposed())
                {
                    topicClusterMaster.clusterMaster.DoDispose();
                }
                throw;
            }
        }
		

		
        internal int GetNextIdFromZK(string zkPath)
        {
            try
            {
                lock (this)
                {
                    zk.SafeCreatePath(GetParentPath(zkPath), ZooKeeperProxy.EmptyBuffer);
                    var nodeVersion = zk.Exists(zkPath, false);
                    if (nodeVersion == null)
                    {
                        int defaultValue = 0;
                        byte[] btArray = System.BitConverter.GetBytes(defaultValue);
                        zk.Create(zkPath, btArray, Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
                        return defaultValue;
                    }
                    else
                    {
                        byte[] btArray = zk.GetData(zkPath, false, nodeVersion);
                        int version = nodeVersion.Version;
                        int lastValue = BitConverter.ToInt32(btArray, 0);
                        int newValue = lastValue + 1;
                        byte[] newValueBytes = BitConverter.GetBytes(newValue);
                        zk.SetData(zkPath, newValueBytes, version);
                        return newValue;
                    }
                }
            }
            catch
            {
                if (zk.IsDisposed())
                {
                    topicClusterMaster.clusterMaster.DoDispose();
                }
                throw;
            }
        }
    }
}
